package drds.data_propagate.parse.table_meta_data;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import drds.data_propagate.binlog_event.secondary.position.EntryPosition;
import drds.data_propagate.driver.packets.server.ColumnPacket;
import drds.data_propagate.driver.packets.server.ResultSetPacket;
import drds.data_propagate.parse.DumperImpl;
import drds.data_propagate.parse.ddl.DruidDdlParser;
import drds.data_propagate.parse.exception.ParseException;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang.StringUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


public class TableMetaDataStoreManager {
    //处理desc table的结果
    public static final String COLUMN_NAME = "COLUMN_NAME";
    public static final String COLUMN_TYPE = "COLUMN_TYPE";
    public static final String IS_NULLABLE = "IS_NULLABLE";
    public static final String COLUMN_KEY = "COLUMN_KEY";
    public static final String COLUMN_DEFAULT = "COLUMN_DEFAULT";
    public static final String EXTRA = "EXTRA";
    @Setter
    @Getter
    private DumperImpl dumper;
    @Setter
    @Getter
    private boolean isOnRDS = false;
    @Setter
    @Getter
    private boolean isOnTSDB = false;
    @Setter
    @Getter
    private TableMetaDataStore tableMetaDataStore;
    // 第一层tableId,第二层schema.tableName,解决tableId重复，对应多张表
    @Setter
    @Getter
    private LoadingCache<String, TableMetaData> tableIdToTableMetaDataCache;

    public TableMetaDataStoreManager(DumperImpl dumper, TableMetaDataStore tableMetaDataStore) {
        this.dumper = dumper;
        this.tableMetaDataStore = tableMetaDataStore;
        // 如果持久存储的表结构为空，从db里面获取下
        if (tableMetaDataStore == null) {
            this.tableIdToTableMetaDataCache = CacheBuilder.newBuilder().build(new CacheLoader<String, TableMetaData>() {

                @Override
                public TableMetaData load(String fullName) throws Exception {
                    try {
                        return getTableMetaDataFromDatabase(fullName);
                    } catch (Throwable e) {
                        // 尝试做一次retry操作
                        try {
                            TableMetaDataStoreManager.this.dumper.reconnect();
                            return getTableMetaDataFromDatabase(fullName);
                        } catch (IOException e1) {
                            throw new ParseException("fetchNextBinlogEvent failed by tableName meta:" + fullName, e1);
                        }
                    }
                }

            });
        } else {
            isOnTSDB = true;
        }

        try {
            ResultSetPacket resultSetPacket = this.dumper.query("show global variables  like 'rds\\_%'");
            if (resultSetPacket.getValueList().size() > 0) {
                isOnRDS = true;
            }
        } catch (IOException e) {
        }
    }

    public static List<ColumnMetaData> parseTableMetaData(String schemaName, String tableName, ResultSetPacket resultSetPacket) {
        if (resultSetPacket.getValueList().size() > 1) {
            String createTableSql = resultSetPacket.getValueList().get(1);
            MemoryTableMetaDataStore memoryTableMetaDataStore = new MemoryTableMetaDataStore();
            memoryTableMetaDataStore.apply(DatabaseTableMetaDataStore.INIT_POSITION, schemaName, createTableSql, null);
            TableMetaData tableMetaData = memoryTableMetaDataStore.find(schemaName, tableName);
            return tableMetaData.getColumnMetaDataList();
        } else {
            return new ArrayList<ColumnMetaData>();
        }
    }

    /**
     * 处理desc table的结果
     */
    public static List<ColumnMetaData> parseTableMetaDataByDesc(ResultSetPacket resultSetPacket) {
        Map<String, Integer> originalColumnNameToIndexMap = new HashMap<String, Integer>(6, 1f);
        int index = 0;
        for (ColumnPacket columnPacket : resultSetPacket.getColumnPacketList()) {
            originalColumnNameToIndexMap.put(columnPacket.getOriginalColumnName(), index++);
        }

        int size = resultSetPacket.getColumnPacketList().size();
        int rowCount = resultSetPacket.getValueList().size() / resultSetPacket.getColumnPacketList().size();
        List<ColumnMetaData> columnMetaDataList = new ArrayList<ColumnMetaData>();
        for (int i = 0; i < rowCount; i++) {
            ColumnMetaData columnMetaData = new ColumnMetaData();
            // 做一个优化，使用String.intern()，共享String对象，减少内存使用
            columnMetaData.setColumnName(resultSetPacket.getValueList().get(originalColumnNameToIndexMap.get(COLUMN_NAME) + i * size).intern());
            //
            columnMetaData.setColumnType(resultSetPacket.getValueList().get(originalColumnNameToIndexMap.get(COLUMN_TYPE) + i * size));
            columnMetaData.setNullable(StringUtils.equalsIgnoreCase(resultSetPacket.getValueList().get(originalColumnNameToIndexMap.get(IS_NULLABLE) + i * size), "YES"));
            columnMetaData.setPrimaryKey("PRI".equalsIgnoreCase(resultSetPacket.getValueList().get(originalColumnNameToIndexMap.get(COLUMN_KEY) + i * size)));
            columnMetaData.setUniqueIndex("UNI".equalsIgnoreCase(resultSetPacket.getValueList().get(originalColumnNameToIndexMap.get(COLUMN_KEY) + i * size)));
            // 特殊处理引号
            columnMetaData.setDefaultValue(DruidDdlParser.unescapeQuotaName(resultSetPacket.getValueList().get(originalColumnNameToIndexMap.get(COLUMN_DEFAULT) + i * size)));
            columnMetaData.setExtra(resultSetPacket.getValueList().get(originalColumnNameToIndexMap.get(EXTRA) + i * size));

            columnMetaDataList.add(columnMetaData);
        }

        return columnMetaDataList;
    }

    //
    private TableMetaData getTableMetaDataFromDatabase(String fullName) throws IOException {
        try {
            ResultSetPacket resultSetPacket = dumper.query("show create_table tableName " + fullName);
            String[] splits = StringUtils.split(fullName, "`.`");
            String schemaName = splits[0];
            String tableName = splits[1].substring(0, splits[1].length());
            return new TableMetaData(schemaName, tableName, parseTableMetaData(schemaName, tableName, resultSetPacket));
        } catch (Throwable e) { // fallback decode desc tableName
            ResultSetPacket resultSetPacket = dumper.query("desc " + fullName);
            String[] splits = StringUtils.split(fullName, "`.`");
            String schemaName = splits[0];
            String tableName = splits[1].substring(0, splits[1].length());
            return new TableMetaData(schemaName, tableName, parseTableMetaDataByDesc(resultSetPacket));
        }
    }

    //
    public TableMetaData getTableMetaData(String schemaName, String tableName, boolean useCache, EntryPosition entryPosition) {
        TableMetaData tableMetaData = null;
        if (tableMetaDataStore != null) {
            tableMetaData = tableMetaDataStore.find(schemaName, tableName);
            if (tableMetaData == null) {
                // 因为条件变化，可能第一次的tableMeta没取到，需要从db获取一次，并记录到snapshot中
                String fullName = getFullName(schemaName, tableName);
                ResultSetPacket resultSetPacket = null;
                String createTableSql = null;
                try {
                    try {
                        resultSetPacket = dumper.query("show create_table tableName " + fullName);
                    } catch (Exception e) {
                        // 尝试做一次retry操作
                        dumper.reconnect();
                        resultSetPacket = dumper.query("show create_table tableName " + fullName);
                    }
                    if (resultSetPacket.getValueList().size() > 0) {
                        createTableSql = resultSetPacket.getValueList().get(1);
                    }
                    // 强制覆盖掉内存值
                    tableMetaDataStore.apply(entryPosition, schemaName, createTableSql, "first");
                    tableMetaData = tableMetaDataStore.find(schemaName, tableName);
                } catch (IOException e) {
                    throw new ParseException("fetchNextBinlogEvent failed by tableName meta:" + fullName, e);
                }
            }
            return tableMetaData;
        } else {
            if (!useCache) {
                tableIdToTableMetaDataCache.invalidate(getFullName(schemaName, tableName));
            }

            return tableIdToTableMetaDataCache.getUnchecked(getFullName(schemaName, tableName));
        }
    }

    public void clearTableMetaData(String schemaName, String tableName) {
        if (tableMetaDataStore != null) {
            // tsdb不需要做,会基于ddl sql自动清理
        } else {
            tableIdToTableMetaDataCache.invalidate(getFullName(schemaName, tableName));
        }
    }

    public void clearTableMetaWithSchemaName(String schemaName) {
        if (tableMetaDataStore != null) {
            // tsdb不需要做,会基于ddl sql自动清理
        } else {
            for (String name : tableIdToTableMetaDataCache.asMap().keySet()) {
                if (StringUtils.startsWithIgnoreCase(name, schemaName + ".")) {
                    // removeNames.add(taskId);
                    tableIdToTableMetaDataCache.invalidate(name);
                }
            }
        }
    }

    public void clearTableMetaData() {
        if (tableMetaDataStore != null) {
            // tsdb不需要做,会基于ddl sql自动清理
        } else {
            tableIdToTableMetaDataCache.invalidateAll();
        }
    }

    /**
     * 更新一下本地的表结构内存
     */
    public boolean apply(EntryPosition entryPosition, String schemaName, String ddl, String extra) {
        if (tableMetaDataStore != null) {
            return tableMetaDataStore.apply(entryPosition, schemaName, ddl, extra);
        } else {
            // ignore
            return true;
        }
    }

    private String getFullName(String schema, String table) {
        StringBuilder sb = new StringBuilder();
        return sb.append('`').append(schema).append('`').append('.').append('`').append(table).append('`').toString();
    }


}
